Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Greenbids Real Time Data Module #3242

Open
wants to merge 60 commits into
base: master
Choose a base branch
from

Conversation

EvgeniiMunin
Copy link
Contributor

@EvgeniiMunin EvgeniiMunin commented Jun 13, 2024

🔧 Type of changes

  • new bid adapter
  • update bid adapter
  • new feature
  • new analytics adapter
  • new module
  • bugfix
  • documentation
  • configuration
  • tech debt (test coverage, refactorings, etc.)

✨ What's the context?

Greenbids Real Time Data module to filter bidders SSP listed in the imp[].ext.prebid.bidder of the bid request at the processed-auction-request stage.

🧠 Rationale behind the change

Artefacts and caching
To perform the filtering the module uses the ML pipeline that outputs the probability of bid per bidder for each imp for the given BidRequest. Then this probability of bid is compared with the threshold to ensure target True Positive Rate (TPR) defined for each partner publisher. Thus the RTD module uses 2 artefacts that are fetched from the Greenbids Google Cloud Storage bucket

  • ML predictor in .onnx format
  • Probability thresholds in .json format with the list of thresholds and their corresponding TPRs

On Greenbids side these artefacts are updated every 15 min in the storage. In the classes ModelCache and ThresholdCache we introduce the caching of these artefacts on RTD side with the cache expiration delay of 15 min using Caffeine. When the artefacts are fetched they are put into the cache with their corresponding keys. When the new thread withe the BidRequest is hooked by the module we search the artefacts from the cache.

  • if cache hit then we proceed with prediction and filtering
  • if cache miss then the thread is tryLocked to fetch the artefacts from GCS. If another thread arrives when the mutex is already locked it skips the fetching, skips the filtering and the RTD hook returns the InvocationResult with no_action and BidRequest unchanged

When the artefacts are fetched we craft ThrottlingMessage with the features necessary for inference. The inference time for a single row of data is up to 0.2 ms in normal regime and 3.5 ms at the first call after fetching (warmup).

Communication with AnalyticsReporter and Exploration
The RTD module also communicates the filtering results with the GreenbidsAnalyticsReporter via AnalyticsTags. Here we populate AnalyticsResult of AnalyticsTag for each Imp the with

  • fingerprint (greenbidsId),
  • isKeptInAuction map of booleans for each bidder wether we keep them in auction or not for the given imp,
  • isExploration flag that is necessary to isolate the training log

Then the AnalyticsTag is then parsed by the AnalyticsReporter from ExtBidResponsePrebid and its content added to the analytics payload sent to Greenbids server.

The Exploration part of traffic is split randomly with the ratio defined for each partner publisher per bid requests and is not filtered by the RTD module.

Publisher settings
We have observed 2 options how to handle the settings by the activated partner publishers to avoid adding them to the code base

  • add the list of authorized partners with their configs (target TPR, Exploration Rate, pbuid) in the prebid-config-with-modules.yaml common config of prebid and update it buy sending manually to PBS team
  • let publishers add their configs direclty into bid-request.json where they indicate the activation of our module in bid request extenstion bid-request.ext.prebid.analytics.greenbids and bid-request.ext.prebid.analytics.greenbids-rtd . Here we use analytics extension in the hierarchy of both moduels as there is no RTD extensions in ExtRequestPrebid , so this is a workaround to add both modules configs but to name them differently, something like analytics.greenbids-analytics and. analytics.greenbids-rtd

At the given moment the 2nd option is implemented in the RTD module.

"ext": {
    "prebid": {
      "targeting": {
        "includewinners": true,
        "includebidderkeys": true
      },
      "analytics": {
        "greenbids": {
          "pbuid": "PBUID_FROM_GREENBIDS",
          "greenbidsSampling": 1
        },
        "greenbids-rtd": {
	        "pbuid": "PBUID_FROM_GREENBIDS",
	        "targetTpr": 0.95,
	        "explorationRate": 0.001
        }
      }
    }
  }

🧪 Test plan

We are testing the following cases

  • shouldExitEarlyIfPartnerNotActivatedInBidRequest when the partner is not activated so the extension bidrequest.ext.prebid.analytics.greenbids-rtd is not defined -> we skip filtering and return FutureSucceeded without modifiying BidRequest
  • shouldExitEarlyIfModelIsNotAvailable when ONNX model is not in cache on the partner and GCS bucket or the artefact init is not available -> we skip filtering and return FutureSucceeded without modifiying BidRequest
  • shouldExitEarlyIfThresholdIsNotAvailable same as above
  • shouldNotFilterBiddersAndReturnAnalyticsTagWhenExploration when we are at the exploration, the model should do the inference but the BidRequest stays unchanged. We populate the AnalyticsTag with the inference results and send it to AnalyticsReporter by indicating all bidders in the result as isKeptInAuction = true
  • shouldFilterBiddersBasedOnModelResults the nominal case when the module operates end2end: we filter the bidders and return the update BidRequest with modified bidrequest.imp[].ext.prebid.bidders and send AnalyticsTags to AnalyticsReporter.
  • shouldFilterBiddersBasedOnModelIfAnyFeatureNotAvailable if by any reason some features are not available in the BidRequest (ex: user IP address or device are null) -> we do the inference anyway same as in normal case but with filling the absent features with empty strings

🏎 Quality check

  • Are your changes following our code style guidelines?
  • Are there any breaking changes in your code?
  • Does your test coverage exceed 90%?
  • Are there any erroneous console logs, debuggers or leftover code in your changes?

@EvgeniiMunin EvgeniiMunin changed the title Add Greenbids RTD Module Add Greenbids Real Time Data Module Aug 21, 2024
@EvgeniiMunin EvgeniiMunin marked this pull request as ready for review August 21, 2024 13:59
Copy link
Collaborator

@AntoxaAntoxic AntoxaAntoxic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are definitely not the last comments, but at least a bunch of them might lead to a lot of changes, so it makes sense to continue the review once they are tackled

.expireAfterWrite(cacheExpirationMinutes, TimeUnit.MINUTES)
.build();

final ReentrantLock lock = new ReentrantLock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding this locking thing my suggestion is following:

  • get rid of locking threads
  • if the value in cache is expired
    • remove the expired data from cache
    • fetch the actual data and put it in cache asynchronously
    • while any other thread sees the cache is empty - skip the module logic
    • the very first fetching might be done on the very first request ideally OR on the app start up alternatively

Copy link
Contributor Author

@EvgeniiMunin EvgeniiMunin Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed the reentrant locks. Added CompletableFuture.runAsync to fetch from GCS in case of cache miss + added Executors.newSingleThreadExecutor to make only one thread access GCS (to avoid race condition)

Now the logic is as follows

  • thread -> if cache miss -> fetch from GCS -> cache.put -> return null and skip filtering
  • thread -> if cache hit -> cache.getIfPresent -> return OnnxModelRunner and ThrottlingThresholds -> do filtering

The very first call when cache is empty is handled in the same way as cache miss -> cache.getIfPresent returns null and we fetch the artefacts and skip filtering after

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the data expires in cache, I suppose the cache is invalidated and when we do cache.getIfPresent(cacheKey) it should return null. So it should be handled automatically by cache with expiration ?

Copy link
Collaborator

@AntoxaAntoxic AntoxaAntoxic Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EvgeniiMunin I didn't understand your take on avoiding race conditions. If you point to the cache as a shared resource for threads you shouldn't worry since that's the main point of having in-memory cache where all the fetch data is preserved until expiration. I don't see any race condition problem here.

What I see you created a bottleneck for all the threads that would be waiting until each of them could fetch the data one by one. As I understand the GCS returns the same data for all the requests for 15 min, correct? If so and you want to minimize the number of calls to GCS you should make a synchronization/notification point where you decide whether the GCS should be called or not.

For example, you can use atomic boolean to notify all the threads that one thread started fetching process.

Another question: does the GCS support async execution? It's worth using them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added AtomicBoolean isFetching flag and changed executorService newCachedThreadPool. So now the fetch is not blocking for other threads.

By default the flag is set to false. When the thread is fetching from GCS it is switched to true with compareAndSet. So other threads are not blocked and skip. When the fetching is done the flag is set back to false.

Concerning GCS doc I don't see the functionallity specifically for async download of objects

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EvgeniiMunin Okay, if the GCS doesn't provide any async interface then please wrap it's call with the vertx.executeBlocking. It will uses the vertx-based thread management of PBS. Also it's fine to operate with vertx Futures since the module eventually returns the Future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra/modules/greenbids-real-time-data/pom.xml Outdated Show resolved Hide resolved
throw new PreBidException("Cache was empty, fetching and put artefacts for next request");
}

final GreenbidsInferenceData greenbidsInferenceData = new GreenbidsInferenceData(jacksonMapper, database);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this one is also can be created once

P.S. I've already asked you to check which objects can be created only once, so once more time, double-check the code and define such objects as beans

Copy link
Contributor Author

@EvgeniiMunin EvgeniiMunin Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this one I've modified the class with Builder annotation and staticprepareData that returns the built GreenbidsInferenceData, because the fields are private and final and we need to recreate it at each call with different throttlingMessages, throttlingInferenceRows. So GreenbidsInferenceData has to be defined at each call

I've rechecked other object created at hook call

  • FilterService moved to GreenbidsRealTimeDataConfig as bean
  • GreenbidsInvocationResult. Same as for GreenbidsInferenceData class here we need to rebuilt the object with different private final updatedBidRequest, invocationAction, analyticsResult, so it has to be kept at hook call

Comment on lines 123 to 130
final GreenbidsInferenceData greenbidsInferenceData = GreenbidsInferenceData
.of(bidRequest, dbReader, mapper);

impsBiddersFilterMap = filterService.filterBidders(
onnxModelRunner,
greenbidsInferenceData.getThrottlingMessages(),
threshold);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably make a service bean from the GreenbidsInferenceData with the method like parseData(BidRequest bidRequest) (please come up with better name) that returns List<ThrottlingMessage>

in that way you can remove dbReader from hook's dependencies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Service annotation to GreenbidsInferenceDataService and then defined it in config GreenbidsRealTimeDataConfiguration as Bean. Named the method that outputs List<ThrottlingMessage> extractThrottlingMessagesFromBidRequest. Removed dbReader from hook field, as it is added to GreenbidsInferenceDataService bean

Comment on lines 40 to 46
final ModelCache modelCache = new ModelCache(
onnxModelPath,
storage,
gcsBucketName,
modelCacheWithExpiration,
onnxModelCacheKeyPrefix,
vertx);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create each time a new ModelCache is something counter intuitive

create ModelCache bean with the modelCache.get(onnxModelPath, partner.getPbuid())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defined modelCache and ThresholdCache as beans + provided paths at and get methods

@Service
public class GreenbidsInferenceDataService {

@Autowired
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant annotation, since you create bean yourself inside spring config.

Copy link
Contributor Author

@EvgeniiMunin EvgeniiMunin Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed redundant autowired annotation. Also removed removed service annotation from GreenbidsInferenceDataService because otherwise the DatabaseReader bean is not injected properly. Kept GreenbidsInferenceDataService and DatabaseReader as beans in spring config

@EvgeniiMunin
Copy link
Contributor Author

@And1sS @AntoxaAntoxic The fixes on comments are applied, could you please have a look

Comment on lines +24 to +28
return truePositiveRates.stream()
.filter(truePositiveRate -> truePositiveRate >= targetTpr)
.map(truePositiveRate -> thresholds.get(truePositiveRates.indexOf(truePositiveRate)))
.max(Comparator.naturalOrder())
.orElse(0.0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            return IntStream.range(0, truePositiveRates.size())
                    .filter(i -> truePositiveRates.get(i) >= targetTpr)
                    .mapToObj(thresholds::get)
                    .max(Comparator.naturalOrder())
                    .orElse(0.0);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is IndexOutOfBound possible there?

Comment on lines +56 to +57
DatabaseReader dbReader,
ObjectMapper mapper) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove static modifier for this method and for others that depends on DatabaseReader and ObjectMapper. Just use field access where it's needed.

|| PC_OS_FAMILIES.contains(osFamily())
|| ("Windows".equals(osFamily()) && "ME".equals(osMajor()))
|| ("Mac OS X".equals(osFamily()) && !userAgent.contains("Silk"))
|| userAgent.contains("Linux") && userAgent.contains("X11"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed some parentheses.


return StreamSupport.stream(results.spliterator(), false)
.filter(onnxItem -> Objects.equals(onnxItem.getKey(), "probabilities"))
.map(onnxItem -> (OnnxTensor) onnxItem.getValue())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some validations? instanceof, validate size of probabilities, etc.

Comment on lines +10 to +25
public class OnnxModelRunner {

private OrtSession session;

private OrtEnvironment environment;

public OnnxModelRunner(byte[] onnxModelBytes) throws OrtException {
environment = OrtEnvironment.getEnvironment();
final OrtSession.SessionOptions options = new OrtSession.SessionOptions();
session = environment.createSession(onnxModelBytes, options);
}

public OrtSession.Result runModel(String[][] throttlingInferenceRow) throws OrtException {
final OnnxTensor inputTensor = OnnxTensor.createTensor(OrtEnvironment.getEnvironment(), throttlingInferenceRow);
return session.run(Collections.singletonMap("input", inputTensor));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need the environment anyway, so let's load it with the class instead of loading it when lots of requests try to do it and get blocked.

public class OnnxModelRunner {

    private static final OrtEnvironment ENVIRONMENT = OrtEnvironment.getEnvironment();
    
    private final OrtSession session;

    public OnnxModelRunner(byte[] onnxModelBytes) throws OrtException {
        session = ENVIRONMENT.createSession(onnxModelBytes, new OrtSession.SessionOptions());
    }

    public OrtSession.Result runModel(String[][] throttlingInferenceRow) throws OrtException {
        final OnnxTensor inputTensor = OnnxTensor.createTensor(ENVIRONMENT, throttlingInferenceRow);
        return session.run(Collections.singletonMap("input", inputTensor));
    }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And i have a question, is the model implementation (session) immutable? Then we can reuse single instance.

Comment on lines +25 to +37
String gcsBucketName;

Cache<String, ThrottlingThresholds> cache;

Storage storage;

ObjectMapper mapper;

String thresholdsCacheKeyPrefix;

AtomicBoolean isFetching;

Vertx vertx;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add private final modifiers for all fields in all services.

Comment on lines 75 to 81
vertx.executeBlocking(promise -> {
final Blob blob = getBlob(thresholdJsonPath);
promise.complete(blob);
})
.map(blob -> loadThrottlingThresholds((Blob) blob))
.onSuccess(thresholds -> cache.put(cacheKey, thresholds))
.onFailure(error -> logger.error("Failed to fetch thresholds"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

String greenbidsId,
Boolean isExploration) {

final String tid = imp.getExt().get("tid").asText();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible NPE

Comment on lines +27 to +30
final List<Imp> impsWithFilteredBidders = updateImps(bidRequest, impsBiddersFilterMap);
final BidRequest updatedBidRequest = isExploration
? bidRequest
: bidRequest.toBuilder().imp(impsWithFilteredBidders).build();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        final BidRequest updatedBidRequest = isExploration
                ? bidRequest
                : bidRequest.toBuilder()
                .imp(updateImps(bidRequest, impsBiddersFilterMap))
                .build();

.<AuctionRequestPayload>builder()
.status(InvocationStatus.success)
.action(action)
.payloadUpdate(payload -> AuctionRequestPayloadImpl.of(bidRequest))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for payloadUpdate if action isn't update.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants